package pulsar

import (
	"context"
	"fmt"
	"time"

	"gitee.com/xfrm/middleware/xcontext"
	"gitee.com/xfrm/middleware/xlog"
	"gitee.com/xfrm/middleware/xmq/pub"
	"gitee.com/xfrm/middleware/xtime"
	"gitee.com/xfrm/middleware/xtrace"
	"github.com/apache/pulsar-client-go/pulsar"
)

var mqOpDurationLimit = 10 * time.Millisecond

func (p *Manager) WriteMsg(ctx context.Context, topic string, key string, value interface{}) (pulsar.MessageID, error) {
	fun := "Manager.WriteMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "pulsar.WriteMsg")
	defer span.Finish()

	conf := &instanceConf{
		lane:  xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:  producerRoleType,
		topic: topic,
		group: "",
	}
	producer, err := p.getProducer(ctx, conf)
	if err != nil {
		return nil, fmt.Errorf("%s, getProducer topic: %s, err: %v", fun, topic, err)
	}

	payload, err := generatePayload(ctx, value)
	if err != nil {
		return nil, fmt.Errorf("%s, generatePayload topic: %s, err: %v", fun, topic, err)
	}

	st := xtime.NewTimeStat()
	defer func() {
		dur := st.Duration()
		if dur > mqOpDurationLimit {
			xlog.Infof(ctx, "%s slow topic:%s dur:%d", fun, topic, dur)
		}
		pub.StatReqDuration(ctx, producer.Topic(), "pulsar.WriteMsg", pub.TraceMessageBusTypePulsar, st.Millisecond())
	}()

	return producer.WriteMsg(ctx, key, payload)
}

func (p *Manager) WriteAsyncMsg(ctx context.Context, topic string, key string, value interface{},
	callback func(MessageID, *ProducerMessage, error)) error {
	fun := "Manager.WriteAsyncMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "pulsar.WriteAsyncMsg")
	defer span.Finish()

	conf := &instanceConf{
		lane:  xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:  producerRoleType,
		topic: topic,
	}
	producer, err := p.getProducer(ctx, conf)
	if err != nil {
		return fmt.Errorf("%s, getProducer topic: %s, err: %v", fun, topic, err)
	}

	payload, err := generatePayload(ctx, value)
	if err != nil {
		return fmt.Errorf("%s, generatePayload topic: %s, err: %v", fun, topic, err)
	}

	st := xtime.NewTimeStat()
	defer func() {
		dur := st.Duration()
		if dur > mqOpDurationLimit {
			xlog.Infof(ctx, "%s slow topic:%s dur:%d", fun, topic, dur)
		}
	}()

	return producer.WriteAsyncMsg(ctx, key, payload, callback)
}

// ReadMsg read a messages from the pulsar topic
func (p *Manager) ReadMsg(ctx context.Context, topic, groupId string, value interface{}) (context.Context, error) {
	fun := "Manager.ReadMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "pulsar.ReadMsg")
	defer span.Finish()

	conf := &instanceConf{
		lane:  xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:  consumerRoleType,
		topic: topic,
		group: groupId,
	}
	consumer, err := p.getConsumer(ctx, conf)
	if err != nil {
		return ctx, fmt.Errorf("%s, getConsumer topic: %s, err: %v", fun, topic, err)
	}

	var payload Payload
	st := xtime.NewTimeStat()

	err = consumer.ReadMsg(ctx, &payload, value)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s groupId:%s dur:%d", fun, topic, groupId, dur)
	}

	pub.StatReqDuration(ctx, consumer.topic, "pulsar.ReadMsg", pub.TraceMessageBusTypePulsar, st.Millisecond())
	if err != nil {
		return ctx, fmt.Errorf("%s, ReadPulsarMsg err: %v, topic: %s", fun, err, topic)
	}

	if len(payload.Value) == 0 {
		return ctx, nil
	}

	mctx, err := parsePayload(&payload, "pulsar.ReadMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, err
}

// FetchMsg read a messages from the pulsar topic
func (p *Manager) FetchMsg(ctx context.Context, topic, groupId string, value interface{}) (context.Context, pub.AckHandler, error) {
	fun := "Manager.FetchMsg -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "pulsar.FetchMsg")
	defer span.Finish()

	conf := &instanceConf{
		lane:  xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:  consumerRoleType,
		topic: topic,
		group: groupId,
	}
	consumer, err := p.getConsumer(ctx, conf)
	if err != nil {
		return ctx, nil, fmt.Errorf("%s, getConsumer topic: %s, err: %v", fun, topic, err)
	}

	var payload Payload
	st := xtime.NewTimeStat()

	handler, err := consumer.FetchMsg(ctx, &payload, value)

	dur := st.Duration()
	if dur > mqOpDurationLimit {
		xlog.Infof(ctx, "%s slow topic:%s groupId:%s dur:%d", fun, topic, groupId, dur)
	}

	pub.StatReqDuration(ctx, consumer.topic, "pulsar.FetchMsg", pub.TraceMessageBusTypePulsar, st.Millisecond())
	if err != nil {
		return ctx, handler, fmt.Errorf("%s, FetchPulsarMsg err: %v, topic: %s", fun, err, topic)
	}

	if len(payload.Value) == 0 {
		return ctx, handler, nil
	}

	mctx, err := parsePayload(&payload, "pulsar.FetchMsg", value)
	mspan := xtrace.SpanFromContext(mctx)
	if mspan != nil {
		defer mspan.Finish()
	}
	return mctx, handler, err
}

func (p *Manager) SeekByTime(ctx context.Context, topic, groupId string, t time.Time) error {
	fun := "Manager.SeekLatestByTime -->"

	span, ctx := xtrace.StartSpanFromContext(ctx, "pulsar.SeekLatestByTime")
	defer span.Finish()

	conf := &instanceConf{
		lane:  xcontext.GetControlRouteGroupWithMasterName(ctx, pub.DefaultRouteGroup),
		role:  consumerRoleType,
		topic: topic,
		group: groupId,
	}

	consumer, err := p.getConsumer(ctx, conf)
	if err != nil {
		return fmt.Errorf("%s, getConsumer topic: %s, err: %v", fun, topic, err)
	}

	st := xtime.NewTimeStat()

	if err := consumer.SetOffsetAt(ctx, t); err != nil {
		return fmt.Errorf("%s, SeekLatestByTime topic: %s, err: %v", fun, topic, err)
	}

	pub.StatReqDuration(ctx, consumer.topic, "pulsar.SeekLatestByTime", pub.TraceMessageBusTypePulsar, st.Millisecond())
	return err
}
